{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Fine-tune a PyTorch Lightning Text Classifier with Ray Data\n",
    "\n",
    "<a id=\"try-anyscale-quickstart-lightning_cola_advanced\" href=\"https://console.anyscale.com/register/ha?render_flow=ray&utm_source=ray_docs&utm_medium=docs&utm_campaign=lightning_cola_advanced\">\n",
    "    <img src=\"../../../_static/img/run-on-anyscale.svg\" alt=\"try-anyscale-quickstart\">\n",
    "</a>\n",
    "<br></br>\n",
    "\n",
    ":::{note}\n",
    "\n",
    "This is an intermediate example demonstrates how to use [Ray Data](https://docs.ray.io/en/latest/data/data.html) with PyTorch Lightning in Ray Train.\n",
    "\n",
    "If you just want to quickly convert your existing PyTorch Lightning scripts into Ray Train, you can refer to the [Lightning Quick Start Guide](train-pytorch-lightning).\n",
    "\n",
    ":::\n",
    "\n",
    "This demo introduces how to fine-tune a text classifier on the [CoLA(The Corpus of Linguistic Acceptability)](https://nyu-mll.github.io/CoLA/) dataset using a pre-trained BERT model. In particular, it follows three steps:\n",
    "- Preprocess the CoLA dataset with Ray Data.\n",
    "- Define a training function with PyTorch Lightning.\n",
    "- Launch distributed training with Ray Train's TorchTrainer."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the following line in order to install all the necessary dependencies:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "tags": [
     "remove-cell"
    ]
   },
   "outputs": [],
   "source": [
    "SMOKE_TEST = True"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install numpy datasets \"transformers>=4.19.1\" \"pytorch_lightning>=1.6.5\""
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Start by importing the needed libraries:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/ray/anaconda3/lib/python3.9/site-packages/transformers/utils/generic.py:441: UserWarning: torch.utils._pytree._register_pytree_node is deprecated. Please use torch.utils._pytree.register_pytree_node instead.\n",
      "  _torch_pytree._register_pytree_node(\n",
      "/home/ray/anaconda3/lib/python3.9/site-packages/transformers/utils/generic.py:309: UserWarning: torch.utils._pytree._register_pytree_node is deprecated. Please use torch.utils._pytree.register_pytree_node instead.\n",
      "  _torch_pytree._register_pytree_node(\n",
      "2025-07-09 16:06:28.571151: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.\n",
      "2025-07-09 16:06:28.619363: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n",
      "2025-07-09 16:06:28.619382: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n",
      "2025-07-09 16:06:28.620593: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n",
      "2025-07-09 16:06:28.628175: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n",
      "To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n",
      "2025-07-09 16:06:29.628216: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT\n"
     ]
    }
   ],
   "source": [
    "import ray\n",
    "import torch\n",
    "import numpy as np\n",
    "import pytorch_lightning as pl\n",
    "import torch.nn.functional as F\n",
    "from torch.utils.data import DataLoader, random_split\n",
    "from transformers import AutoTokenizer, AutoModelForSequenceClassification\n",
    "from datasets import load_dataset\n",
    "from evaluate import load"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": []
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pre-process CoLA Dataset\n",
    "\n",
    "CoLA is a dataset for binary sentence classification with 10.6K training examples. First, download the dataset and metrics using the Hugging Face datasets API, and create a Ray Dataset for each split accordingly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "196d64411a9643029163c6fa18c3e639",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Parquet Files Sample 0:   0%|          | 0.00/1.00 [00:00<?, ? file/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "9df067adb6344da1afa98536e6496457",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "Parquet Files Sample 0:   0%|          | 0.00/1.00 [00:00<?, ? file/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "dataset = load_dataset(\"glue\", \"cola\")\n",
    "\n",
    "train_dataset = ray.data.from_huggingface(dataset[\"train\"])\n",
    "validation_dataset = ray.data.from_huggingface(dataset[\"validation\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": []
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, tokenize the input sentences and pad the ID sequence to length 128 using the `bert-base-uncased` tokenizer. The {meth}`map_batches <ray.data.Dataset.map_batches>` applies this preprocessing function on all data samples."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/ray/anaconda3/lib/python3.9/site-packages/huggingface_hub/file_download.py:795: FutureWarning: `resume_download` is deprecated and will be removed in version 1.0.0. Downloads always resume when possible. If you want to force a new download, use `force_download=True`.\n",
      "  warnings.warn(\n"
     ]
    }
   ],
   "source": [
    "tokenizer = AutoTokenizer.from_pretrained(\"bert-base-cased\")\n",
    "\n",
    "def tokenize_sentence(batch):\n",
    "    outputs = tokenizer(\n",
    "        batch[\"sentence\"].tolist(),\n",
    "        max_length=128,\n",
    "        truncation=True,\n",
    "        padding=\"max_length\",\n",
    "        return_tensors=\"np\",\n",
    "    )\n",
    "    outputs[\"label\"] = batch[\"label\"]\n",
    "    return outputs\n",
    "\n",
    "train_dataset = train_dataset.map_batches(tokenize_sentence, batch_format=\"numpy\")\n",
    "validation_dataset = validation_dataset.map_batches(tokenize_sentence, batch_format=\"numpy\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define a PyTorch Lightning model\n",
    "\n",
    "You don't have to make any changes to your `LightningModule` definition. Just copy and paste your code here:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "class SentimentModel(pl.LightningModule):\n",
    "    def __init__(self, lr=2e-5, eps=1e-8):\n",
    "        super().__init__()\n",
    "        self.lr = lr\n",
    "        self.eps = eps\n",
    "        self.num_classes = 2\n",
    "        self.model = AutoModelForSequenceClassification.from_pretrained(\n",
    "            \"bert-base-cased\", num_labels=self.num_classes\n",
    "        )\n",
    "        self.metric = load(\"glue\", \"cola\")\n",
    "        self.predictions = []\n",
    "        self.references = []\n",
    "\n",
    "    def forward(self, batch):\n",
    "        input_ids, attention_mask = batch[\"input_ids\"], batch[\"attention_mask\"]\n",
    "        outputs = self.model(input_ids, attention_mask=attention_mask)\n",
    "        logits = outputs.logits\n",
    "        return logits\n",
    "\n",
    "    def training_step(self, batch, batch_idx):\n",
    "        labels = batch[\"label\"]\n",
    "        logits = self.forward(batch)\n",
    "        loss = F.cross_entropy(logits.view(-1, self.num_classes), labels)\n",
    "        self.log(\"train_loss\", loss)\n",
    "        return loss\n",
    "\n",
    "    def validation_step(self, batch, batch_idx):\n",
    "        labels = batch[\"label\"]\n",
    "        logits = self.forward(batch)\n",
    "        preds = torch.argmax(logits, dim=1)\n",
    "        self.predictions.append(preds)\n",
    "        self.references.append(labels)\n",
    "\n",
    "    def on_validation_epoch_end(self):\n",
    "        predictions = torch.concat(self.predictions).view(-1)\n",
    "        references = torch.concat(self.references).view(-1)\n",
    "        matthews_correlation = self.metric.compute(\n",
    "            predictions=predictions, references=references\n",
    "        )\n",
    "\n",
    "        # self.metric.compute() returns a dictionary:\n",
    "        # e.g. {\"matthews_correlation\": 0.53}\n",
    "        self.log_dict(matthews_correlation, sync_dist=True)\n",
    "        self.predictions.clear()\n",
    "        self.references.clear()\n",
    "\n",
    "    def configure_optimizers(self):\n",
    "        return torch.optim.AdamW(self.parameters(), lr=self.lr, eps=self.eps)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define a training function\n",
    "\n",
    "Define a [training function](train-overview-training-function) that includes all of your lightning training logic. {class}`TorchTrainer <ray.train.torch.TorchTrainer>` launches this function on each worker in parallel. \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "import ray.train\n",
    "from ray.train.lightning import (\n",
    "    prepare_trainer,\n",
    "    RayDDPStrategy,\n",
    "    RayLightningEnvironment,\n",
    "    RayTrainReportCallback,\n",
    ")\n",
    "\n",
    "train_func_config = {\n",
    "    \"lr\": 1e-5,\n",
    "    \"eps\": 1e-8,\n",
    "    \"batch_size\": 16,\n",
    "    \"max_epochs\": 5,\n",
    "}\n",
    "\n",
    "def train_func(config):\n",
    "    # Unpack the input configs passed from `TorchTrainer(train_loop_config)`\n",
    "    lr = config[\"lr\"]\n",
    "    eps = config[\"eps\"]\n",
    "    batch_size = config[\"batch_size\"]\n",
    "    max_epochs = config[\"max_epochs\"]\n",
    "\n",
    "    # Fetch the Dataset shards\n",
    "    train_ds = ray.train.get_dataset_shard(\"train\")\n",
    "    val_ds = ray.train.get_dataset_shard(\"validation\")\n",
    "\n",
    "    # Create a dataloader for Ray Datasets\n",
    "    train_ds_loader = train_ds.iter_torch_batches(batch_size=batch_size)\n",
    "    val_ds_loader = val_ds.iter_torch_batches(batch_size=batch_size)\n",
    "\n",
    "    # Model\n",
    "    model = SentimentModel(lr=lr, eps=eps)\n",
    "\n",
    "    trainer = pl.Trainer(\n",
    "        max_epochs=max_epochs,\n",
    "        accelerator=\"auto\",\n",
    "        devices=\"auto\",\n",
    "        strategy=RayDDPStrategy(),\n",
    "        plugins=[RayLightningEnvironment()],\n",
    "        callbacks=[RayTrainReportCallback()],\n",
    "        enable_progress_bar=False,\n",
    "    )\n",
    "\n",
    "    trainer = prepare_trainer(trainer)\n",
    "\n",
    "    trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To enable distributed training with Ray Train, configure the Lightning Trainer with the following utilities:\n",
    "\n",
    "- {class}`~ray.train.lightning.RayDDPStrategy`\n",
    "- {class}`~ray.train.lightning.RayLightningEnvironment`\n",
    "- {class}`~ray.train.lightning.RayTrainReportCallback`\n",
    "\n",
    "\n",
    "To ingest Ray Data with Lightning Trainer, follow these three steps:\n",
    "\n",
    "- Feed the full Ray dataset to Ray `TorchTrainer` (details in the next section).\n",
    "- Use {meth}`ray.train.get_dataset_shard <ray.train.get_dataset_shard>` to fetch the sharded dataset on each worker.\n",
    "- Use {meth}`ds.iter_torch_batches <ray.data.Dataset.iter_torch_batches>` to create a Ray data loader for Lightning Trainer.\n",
    "\n",
    ":::{seealso}\n",
    "\n",
    "- {ref}`Lightning Quick Start Guide <train-pytorch-lightning>`\n",
    "- {ref}`User Guides for Ray Data <data-ingest-torch>`\n",
    "\n",
    ":::"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "tags": [
     "remove-cell"
    ]
   },
   "outputs": [],
   "source": [
    "if SMOKE_TEST:\n",
    "    train_func_config[\"max_epochs\"] = 2\n",
    "    train_dataset = train_dataset.random_sample(0.1)\n",
    "    validation_dataset = validation_dataset.random_sample(0.1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Distributed training with Ray TorchTrainer\n",
    "\n",
    "Next, define a {class}`TorchTrainer <ray.train.torch.TorchTrainer>` to launch your training function on 4 GPU workers. \n",
    "\n",
    "You can pass the full Ray dataset to the `datasets` argument of ``TorchTrainer``. TorchTrainer automatically shards the datasets among multiple workers."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "from ray.train.torch import TorchTrainer\n",
    "from ray.train import RunConfig, ScalingConfig, CheckpointConfig, DataConfig\n",
    "\n",
    "\n",
    "# Save the top-2 checkpoints according to the evaluation metric\n",
    "# The checkpoints and metrics are reported by `RayTrainReportCallback`\n",
    "run_config = RunConfig(\n",
    "    name=\"ptl-sent-classification\",\n",
    "    checkpoint_config=CheckpointConfig(\n",
    "        num_to_keep=2,\n",
    "        checkpoint_score_attribute=\"matthews_correlation\",\n",
    "        checkpoint_score_order=\"max\",\n",
    "    ),\n",
    ")\n",
    "\n",
    "# Schedule four workers for DDP training (1 GPU/worker by default)\n",
    "scaling_config = ScalingConfig(num_workers=4, use_gpu=True)\n",
    "\n",
    "trainer = TorchTrainer(\n",
    "    train_loop_per_worker=train_func,\n",
    "    train_loop_config=train_func_config,\n",
    "    scaling_config=scaling_config,\n",
    "    run_config=run_config,\n",
    "    datasets={\"train\": train_dataset, \"validation\": validation_dataset}, # <- Feed the Ray Datasets here\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-07-09 16:06:43,377\tINFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:06:43 (running for 00:00:00.11)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/node-group:head)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 PENDING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/transformers/utils/generic.py:441: UserWarning: torch.utils._pytree._register_pytree_node is deprecated. Please use torch.utils._pytree.register_pytree_node instead.\n",
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m   _torch_pytree._register_pytree_node(\n",
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/transformers/utils/generic.py:309: UserWarning: torch.utils._pytree._register_pytree_node is deprecated. Please use torch.utils._pytree.register_pytree_node instead.\n",
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m   _torch_pytree._register_pytree_node(\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:06:48 (running for 00:00:05.13)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/node-group:head)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 PENDING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m 2025-07-09 16:06:51.068628: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.\n",
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m 2025-07-09 16:06:51.116629: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n",
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m 2025-07-09 16:06:51.116652: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n",
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m 2025-07-09 16:06:51.117931: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n",
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m 2025-07-09 16:06:51.125011: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n",
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n",
      "\u001b[36m(TrainTrainable pid=47169)\u001b[0m 2025-07-09 16:06:52.119328: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:06:53 (running for 00:00:10.16)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/accelerator_shape:4xT4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 PENDING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m Setting up process group for: env:// [rank=0, world_size=4]\n",
      "\u001b[36m(TorchTrainer pid=47169)\u001b[0m Started distributed worker processes: \n",
      "\u001b[36m(TorchTrainer pid=47169)\u001b[0m - (node_id=f67b5f412a227b4c6b3ddd85d6f5b1eecd0bd0917efa8f9cd4b5e4da, ip=10.0.114.132, pid=47314) world_rank=0, local_rank=0, node_rank=0\n",
      "\u001b[36m(TorchTrainer pid=47169)\u001b[0m - (node_id=f67b5f412a227b4c6b3ddd85d6f5b1eecd0bd0917efa8f9cd4b5e4da, ip=10.0.114.132, pid=47313) world_rank=1, local_rank=1, node_rank=0\n",
      "\u001b[36m(TorchTrainer pid=47169)\u001b[0m - (node_id=f67b5f412a227b4c6b3ddd85d6f5b1eecd0bd0917efa8f9cd4b5e4da, ip=10.0.114.132, pid=47316) world_rank=2, local_rank=2, node_rank=0\n",
      "\u001b[36m(TorchTrainer pid=47169)\u001b[0m - (node_id=f67b5f412a227b4c6b3ddd85d6f5b1eecd0bd0917efa8f9cd4b5e4da, ip=10.0.114.132, pid=47321) world_rank=3, local_rank=3, node_rank=0\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:06:58 (running for 00:00:15.19)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/accelerator_shape:4xT4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/transformers/utils/generic.py:441: UserWarning: torch.utils._pytree._register_pytree_node is deprecated. Please use torch.utils._pytree.register_pytree_node instead.\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m   _torch_pytree._register_pytree_node(\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 2025-07-09 16:07:03.237463: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 2025-07-09 16:07:03.285818: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 2025-07-09 16:07:03.285846: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 2025-07-09 16:07:03.287089: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 2025-07-09 16:07:03.294281: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:03 (running for 00:00:20.21)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/provider:aws, 0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/region:us-west-2)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 2025-07-09 16:07:04.341505: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT\n",
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/transformers/utils/generic.py:441: UserWarning: torch.utils._pytree._register_pytree_node is deprecated. Please use torch.utils._pytree.register_pytree_node instead.\u001b[32m [repeated 9x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)\u001b[0m\n",
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m   _torch_pytree._register_pytree_node(\u001b[32m [repeated 9x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/huggingface_hub/file_download.py:795: FutureWarning: `resume_download` is deprecated and will be removed in version 1.0.0. Downloads always resume when possible. If you want to force a new download, use `force_download=True`.\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m   warnings.warn(\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.weight', 'classifier.bias']\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/ray/train/lightning/_lightning_utils.py:262: RayDeprecationWarning: This API is deprecated and may be removed in future Ray releases. You could suppress this warning by setting env variable PYTHONWARNINGS=\"ignore::DeprecationWarning\"\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m `get_trial_name` is deprecated because the concept of a `Trial` will soon be removed in Ray Train.Ray Train will no longer assume that it's running within a Ray Tune `Trial` in the future. See this issue for more context and migration options: https://github.com/ray-project/ray/issues/49454. Disable these warnings by setting the environment variable: RAY_TRAIN_ENABLE_V2_MIGRATION_WARNINGS=0\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m   self.trial_name = train.get_context().get_trial_name()\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m GPU available: True (cuda), used: True\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m TPU available: False, using: 0 TPU cores\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m IPU available: False, using: 0 IPUs\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m HPU available: False, using: 0 HPUs\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m Missing logger folder: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/working_dirs/TorchTrainer_61240_00000_0_2025-07-09_16-06-43/lightning_logs\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2,3]\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m \n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m   | Name  | Type                          | Params\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m --------------------------------------------------------\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 0 | model | BertForSequenceClassification | 108 M \n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m --------------------------------------------------------\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 108 M     Trainable params\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 0         Non-trainable params\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 108 M     Total params\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m 433.247   Total estimated model params size (MB)\n",
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m Registered dataset logger for dataset validation_40_0\n",
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m Starting execution of Dataset validation_40_0. Full logs are in /tmp/ray/session_2025-07-09_15-09-59_163606_3385/logs/ray-data\n",
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m Execution plan of Dataset validation_40_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)->MapBatches(random_sample)] -> OutputSplitter[split(4, equal=True)]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "66f35c7963ae4454a7c3c994e74c4931",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "56f89118e64b4fddb4f7c396982bb681",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) - ReadParquet->SplitBlocks(96) 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "805fb314d7804299a18666be5963561f",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) - MapBatches(tokenize_sentence)->MapBatches(random_sample) 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "7476884ca79143508e60cfe86fbce086",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) - split(4, equal=True) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:08 (running for 00:00:25.23)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/provider:aws, 0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/region:us-west-2)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n",
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:13 (running for 00:00:30.26)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 accelerator_type:T4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m 2025-07-09 16:07:03.305020: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m 2025-07-09 16:07:03.353280: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m 2025-07-09 16:07:03.353303: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m 2025-07-09 16:07:03.354507: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m 2025-07-09 16:07:03.361526: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m 2025-07-09 16:07:04.397838: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(tokenize_sentence)->MapBatches(random_sample) pid=48062)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/transformers/utils/generic.py:441: UserWarning: torch.utils._pytree._register_pytree_node is deprecated. Please use torch.utils._pytree.register_pytree_node instead.\u001b[32m [repeated 5x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(tokenize_sentence)->MapBatches(random_sample) pid=48062)\u001b[0m   _torch_pytree._register_pytree_node(\u001b[32m [repeated 5x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/huggingface_hub/file_download.py:795: FutureWarning: `resume_download` is deprecated and will be removed in version 1.0.0. Downloads always resume when possible. If you want to force a new download, use `force_download=True`.\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m   warnings.warn(\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47316)\u001b[0m Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']\u001b[32m [repeated 2x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47316)\u001b[0m You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/ray/train/lightning/_lightning_utils.py:262: RayDeprecationWarning: This API is deprecated and may be removed in future Ray releases. You could suppress this warning by setting env variable PYTHONWARNINGS=\"ignore::DeprecationWarning\"\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m `get_trial_name` is deprecated because the concept of a `Trial` will soon be removed in Ray Train.Ray Train will no longer assume that it's running within a Ray Tune `Trial` in the future. See this issue for more context and migration options: https://github.com/ray-project/ray/issues/49454. Disable these warnings by setting the environment variable: RAY_TRAIN_ENABLE_V2_MIGRATION_WARNINGS=0\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m   self.trial_name = train.get_context().get_trial_name()\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47313)\u001b[0m Missing logger folder: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/working_dirs/TorchTrainer_61240_00000_0_2025-07-09_16-06-43/lightning_logs\u001b[32m [repeated 3x across cluster]\u001b[0m\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m LOCAL_RANK: 3 - CUDA_VISIBLE_DEVICES: [0,1,2,3]\u001b[32m [repeated 3x across cluster]\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:18 (running for 00:00:35.29)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 accelerator_type:T4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m ✔️  Dataset validation_40_0 execution finished in 11.01 seconds\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "c4e0b6f3679e45c0ade10ebfec3870c5",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47667) Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "0774df975e1547e7aec3953f8f4126f8",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47667) - ReadParquet->SplitBlocks(96) 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "6398890fc05e4920970ab3c292d66028",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47667) - MapBatches(tokenize_sentence)->MapBatches(random_sample) 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "da20605e6db14c8e98edfe1b1736333c",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47667) - split(4, equal=True) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m Registered dataset logger for dataset train_39_0\n",
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m Starting execution of Dataset train_39_0. Full logs are in /tmp/ray/session_2025-07-09_15-09-59_163606_3385/logs/ray-data\n",
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m Execution plan of Dataset train_39_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)->MapBatches(random_sample)] -> OutputSplitter[split(4, equal=True)]\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m [rank0]:[W reducer.cpp:1389] Warning: find_unused_parameters=True was specified in DDP constructor, but did not find any unused parameters in the forward pass. This flag results in an extra traversal of the autograd graph every iteration,  which can adversely affect performance. If your model indeed never has any unused parameters in the forward pass, consider turning this flag off. Note that this warning may be a false positive if your model has flow control causing later iterations to have unused parameters. (function operator())\n",
      "\u001b[36m(MapBatches(tokenize_sentence)->MapBatches(random_sample) pid=50017)\u001b[0m /home/ray/anaconda3/lib/python3.9/site-packages/transformers/utils/generic.py:441: UserWarning: torch.utils._pytree._register_pytree_node is deprecated. Please use torch.utils._pytree.register_pytree_node instead.\u001b[32m [repeated 46x across cluster]\u001b[0m\n",
      "\u001b[36m(MapBatches(tokenize_sentence)->MapBatches(random_sample) pid=50017)\u001b[0m   _torch_pytree._register_pytree_node(\u001b[32m [repeated 46x across cluster]\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:23 (running for 00:00:40.31)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 accelerator_type:T4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m ✔️  Dataset train_39_0 execution finished in 2.76 seconds\n",
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m Registered dataset logger for dataset validation_40_1\n",
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m Starting execution of Dataset validation_40_1. Full logs are in /tmp/ray/session_2025-07-09_15-09-59_163606_3385/logs/ray-data\n",
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m Execution plan of Dataset validation_40_1: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)->MapBatches(random_sample)] -> OutputSplitter[split(4, equal=True)]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "69c836df9d0242aa9fc2070cb7bbc0d9",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "f661bd51476c4a83814d8b20e98d7214",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) - ReadParquet->SplitBlocks(96) 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "9f5dcd3282eb4b008c50ef16015c7c6d",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) - MapBatches(tokenize_sentence)->MapBatches(random_sample) 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "29d67ea47c614625929a15a34525ddc3",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) - split(4, equal=True) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m ✔️  Dataset validation_40_1 execution finished in 2.42 seconds\n",
      "\u001b[36m(RayTrainWorker pid=47321)\u001b[0m [rank3]:[W reducer.cpp:1389] Warning: find_unused_parameters=True was specified in DDP constructor, but did not find any unused parameters in the forward pass. This flag results in an extra traversal of the autograd graph every iteration,  which can adversely affect performance. If your model indeed never has any unused parameters in the forward pass, consider turning this flag off. Note that this warning may be a false positive if your model has flow control causing later iterations to have unused parameters. (function operator())\u001b[32m [repeated 3x across cluster]\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:28 (running for 00:00:45.34)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 accelerator_type:T4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(RayTrainWorker pid=47316)\u001b[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/ray/ray_results/ptl-sent-classification/TorchTrainer_61240_00000_0_2025-07-09_16-06-43/checkpoint_000000)\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:33 (running for 00:00:50.38)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/region:us-west-2)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "3cc9de40efff4bbf8d81672352ec7429",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47667) Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m Registered dataset logger for dataset train_39_1\n",
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m Starting execution of Dataset train_39_1. Full logs are in /tmp/ray/session_2025-07-09_15-09-59_163606_3385/logs/ray-data\n",
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m Execution plan of Dataset train_39_1: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)->MapBatches(random_sample)] -> OutputSplitter[split(4, equal=True)]\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/ray/ray_results/ptl-sent-classification/TorchTrainer_61240_00000_0_2025-07-09_16-06-43/checkpoint_000000)\u001b[32m [repeated 3x across cluster]\u001b[0m\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "b0bb61197c954441a778f09b6fde1c7f",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47667) - ReadParquet->SplitBlocks(96) 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "c4b2264b99b447348eb967a2e2fe90dd",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47667) - MapBatches(tokenize_sentence)->MapBatches(random_sample) 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "2d26a79014fd4907bce9482bf74f94fe",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47667) - split(4, equal=True) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:38 (running for 00:00:55.41)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/region:us-west-2)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(SplitCoordinator pid=47667)\u001b[0m ✔️  Dataset train_39_1 execution finished in 3.11 seconds\n",
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m Registered dataset logger for dataset validation_40_2\n",
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m Starting execution of Dataset validation_40_2. Full logs are in /tmp/ray/session_2025-07-09_15-09-59_163606_3385/logs/ray-data\n",
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m Execution plan of Dataset validation_40_2: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)->MapBatches(random_sample)] -> OutputSplitter[split(4, equal=True)]\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "9f7033745bbf4abfa0aa322e66f9605c",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) Running 0: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "be1e14f319ad4a6cbed5e56e0461a5c5",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) - ReadParquet->SplitBlocks(96) 1: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "bd05af9c7e104f00b938f193f4b8ac6e",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) - MapBatches(tokenize_sentence)->MapBatches(random_sample) 2: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "2fb0d2c757fd4a10816eec6bf4f06f98",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "(pid=47666) - split(4, equal=True) 3: 0.00 row [00:00, ? row/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:43 (running for 00:01:00.43)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/accelerator_shape:4xT4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(SplitCoordinator pid=47666)\u001b[0m ✔️  Dataset validation_40_2 execution finished in 2.39 seconds\n",
      "\u001b[36m(RayTrainWorker pid=47316)\u001b[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/ray/ray_results/ptl-sent-classification/TorchTrainer_61240_00000_0_2025-07-09_16-06-43/checkpoint_000001)\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:48 (running for 00:01:05.46)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 accelerator_type:T4, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/accelerator_shape:4xT4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m `Trainer.fit` stopped: `max_epochs=2` reached.\n",
      "\u001b[36m(RayTrainWorker pid=47314)\u001b[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/ray/ray_results/ptl-sent-classification/TorchTrainer_61240_00000_0_2025-07-09_16-06-43/checkpoint_000001)\u001b[32m [repeated 3x across cluster]\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:53 (running for 00:01:10.49)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 accelerator_type:T4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 RUNNING)\n",
      "\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-07-09 16:07:54,970\tINFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/home/ray/ray_results/ptl-sent-classification' in 0.0022s.\n",
      "2025-07-09 16:07:54,972\tINFO tune.py:1041 -- Total run time: 71.59 seconds (71.58 seconds for the tuning loop).\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "== Status ==\n",
      "Current time: 2025-07-09 16:07:54 (running for 00:01:11.59)\n",
      "Using FIFO scheduling algorithm.\n",
      "Logical resource usage: 1.0/48 CPUs, 4.0/4 GPUs (0.0/1.0 anyscale/accelerator_shape:4xT4, 0.0/1.0 anyscale/provider:aws, 0.0/1.0 anyscale/node-group:head, 0.0/1.0 anyscale/region:us-west-2, 0.0/1.0 accelerator_type:T4)\n",
      "Result logdir: /tmp/ray/session_2025-07-09_15-09-59_163606_3385/artifacts/2025-07-09_16-06-43/ptl-sent-classification/driver_artifacts\n",
      "Number of trials: 1/1 (1 TERMINATED)\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "result = trainer.fit()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    ":::{note}\n",
    "Note that this examples uses Ray Data for data ingestion for faster preprocessing, but you can also continue to use the native `PyTorch DataLoader` or `LightningDataModule`. See {doc}`Train a Pytorch Lightning Image Classifier <lightning_mnist_example>`. \n",
    "\n",
    ":::"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Result(\n",
       "  metrics={'train_loss': 0.8652846813201904, 'matthews_correlation': 0.0, 'epoch': 1, 'step': 28},\n",
       "  path='/home/ray/ray_results/ptl-sent-classification/TorchTrainer_61240_00000_0_2025-07-09_16-06-43',\n",
       "  filesystem='local',\n",
       "  checkpoint=Checkpoint(filesystem=local, path=/home/ray/ray_results/ptl-sent-classification/TorchTrainer_61240_00000_0_2025-07-09_16-06-43/checkpoint_000001)\n",
       ")"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "result"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## See also\n",
    "\n",
    "* [Ray Train Examples](../../examples) for more use cases\n",
    "\n",
    "* [Ray Train User Guides](train-user-guides) for how-to guides"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "build",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.23"
  },
  "orphan": true,
  "vscode": {
   "interpreter": {
    "hash": "178108d354ddc93ba36c4b7bfc5283800982aac0e7ca92cc0cf312ad1b8f8b20"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
